package cn.ant.mqHandle;

import cn.ant.config.RabbitMQConfig;
import cn.ant.entity.MessageSendDTO;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 队列的消费者；这是监听队列A的消费者（A、B）
 *
 * @author Anhui AntLaddie <a href="https://juejin.cn/user/4092805620185316">(掘金蚂蚁小哥)</a>
 * @version 1.0
 **/
@Slf4j
@Component
public class QueueConsumer {

    /***
     * 消费者A（监听）队列queueAName
     * @param msgData 传递的具体消息，最好是生产者发送使用什么类型，这里接收就用什么类型
     * @param deliveryTag 处理消息的编号（可通过message.getMessageProperties().getDeliveryTag()拿到）
     * @param message 这个就类似我们原生的message
     * @param channel 这个就类似我们原生的channel
     */
    // @RabbitListener监听队列；若监听多个则在queues的{}里面逗号分割；
    // ackMode设置应答模式优先级是比yml上配置的acknowledge-mode优先级高
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}, ackMode = "MANUAL")
    public void messageSimpleHandleA(@Payload String msgData, // 这个是生产者发送的JSON消息
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, // 处理消息的编号
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        // 获取到队列消息，因为发送是JSON格式，我们要解析对象格式
        // message.getBody()：存储消息的具体内容（序列化后的二进制数据）
        String msgJsonStr = new String(message.getBody(), StandardCharsets.UTF_8);
        MessageSendDTO msg = JSONObject.parseObject(msgJsonStr, MessageSendDTO.class);
        //假设消费者A处理消息慢，每8秒处理一条
        Thread.sleep(8000);
        log.info("A：消息由消费者A消费：{}，并消费完成", msg);
        // 手动确认，注：这个deliveryTag可以通过message.getMessageProperties().getDeliveryTag()拿到
        channel.basicAck(deliveryTag, false);
    }

    /***
     * 消费者B（监听）队列queueAName
     * @param msgData 传递的具体消息，最好是生产者发送使用什么类型，这里接收就用什么类型
     * @param deliveryTag 处理消息的编号（可通过message.getMessageProperties().getDeliveryTag()拿到）
     * @param message 这个就类似我们原生的message
     * @param channel 这个就类似我们原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}, ackMode = "MANUAL")
    public void messageSimpleHandleB(@Payload String msgData, // 这个是生产者发送的JSON消息
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, // 处理消息的编号
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        // 获取到队列消息，因为发送是JSON格式，我们要解析对象格式
        // message.getBody()：存储消息的具体内容（序列化后的二进制数据）
        String msgJsonStr = new String(message.getBody(), StandardCharsets.UTF_8);
        MessageSendDTO msg = JSONObject.parseObject(msgJsonStr, MessageSendDTO.class);
        // 假设消费者B处理消息快，每2秒处理一条
        Thread.sleep(2000);

        //模拟判断我是否需要手动确认(若随机不是2则确认消费，否则拒绝，继续交由队列)
        if (Math.ceil(Math.random() * 4) != 2) {
            log.info("B：消息由消费者B消费：{}，并消费完成", msg);
            //手动确认
            channel.basicAck(deliveryTag, false);
        } else {
            log.info("B：消息由消费者B消费：{}，并消费失败，丢回队列", msg);
            // 消息编号我们也可以通过message取出来，不用deliveryTag，在message可以获取更多的信息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

// 非手动确认代码（注释）
//    /***
//     * 消费者A（监听）队列queueAName
//     * @param msgData 传递的具体消息，最好是生产者发送使用什么类型，这里接收就用什么类型
//     * @param message 这个就类似我们原生的message
//     * @param channel 这个就类似我们原生的channel
//     */
//    // @RabbitListener监听队列；若监听多个则在queues的{}里面逗号分割；
//    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME})
//    public void messageSimpleHandleA(@Payload String msgData, // 这个是生产者发送的JSON消息
//                                     Message message,
//                                     Channel channel) throws InterruptedException {
//        // 获取到队列消息，因为发送是JSON格式，我们要解析对象格式
//        // message.getBody()：存储消息的具体内容（序列化后的二进制数据）
//        String msgJsonStr = new String(message.getBody(), StandardCharsets.UTF_8);
//        MessageSendDTO msg = JSONObject.parseObject(msgJsonStr, MessageSendDTO.class);
//        //假设消费者A处理消息慢，每8秒处理一条
//        Thread.sleep(8000);
//        log.info("A：消息由消费者A消费：{}，并消费完成", msg);
//    }
//
//    /***
//     * 消费者B（监听）队列queueAName
//     * @param msgData 传递的具体消息，最好是生产者发送使用什么类型，这里接收就用什么类型
//     * @param message 这个就类似我们原生的message
//     * @param channel 这个就类似我们原生的channel
//     */
//    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME})
//    public void messageSimpleHandleB(@Payload String msgData, // 这个是生产者发送的JSON消息
//                                     Message message,
//                                     Channel channel) throws InterruptedException {
//        // 获取到队列消息，因为发送是JSON格式，我们要解析对象格式
//        // message.getBody()：存储消息的具体内容（序列化后的二进制数据）
//        String msgJsonStr = new String(message.getBody(), StandardCharsets.UTF_8);
//        MessageSendDTO msg = JSONObject.parseObject(msgJsonStr, MessageSendDTO.class);
//        // 假设消费者B处理消息快，每2秒处理一条
//        Thread.sleep(2000);
//        log.info("B：消息由消费者B消费：{}，并消费完成", msg);
//    }
}
